/*
 * Copyright 2014-2025 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.netflix.atlas.lwc.events

import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.util.SortedTagMap
import com.netflix.atlas.json.Json
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.ManualClock
import munit.FunSuite

import java.io.StringWriter
import scala.util.Using

class LwcEventClientSuite extends FunSuite {

  import LwcEventSuite.*
  import LwcEventClientSuite.*

  private val clock = new ManualClock()
  private val step = 5_000L

  override def beforeEach(context: BeforeEach): Unit = {
    clock.setWallTime(0L)
    clock.setMonotonicTime(0L)
  }

  private val sampleSpan: TestEvent = {
    TestEvent(SortedTagMap("app" -> "www", "node" -> "i-123"), 42L)
  }

  private val sampleLwcEvent: LwcEvent = LwcEvent(sampleSpan, extractSpanValue(sampleSpan))

  private def mkEvent(duration: Long): LwcEvent = {
    val event = TestEvent(SortedTagMap("app" -> "www", "node" -> "i-123"), duration)
    LwcEvent(event, extractSpanValue(event))
  }

  test("pass-through") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", 60000, "app,foo,:eq", Subscriptions.Events),
        Subscription("2", 60000, "app,www,:eq", Subscriptions.Events)
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    client.process(sampleLwcEvent)
    assertEquals(
      List("""data: {"id":"2","event":{"tags":{"app":"www","node":"i-123"},"duration":42}}"""),
      output.result()
    )
  }

  test("sampled pass-through") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", step, "app,foo,:eq,(,app,),(,node,),:sample", Subscriptions.Events),
        Subscription("2", step, "app,www,:eq,(,app,),(,node,),:sample", Subscriptions.Events)
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    client.process(sampleLwcEvent)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    assertEquals(
      List(
        """data: {"id":"2","event":{"id":"2","tags":{"app":"www"},"timestamp":5000,"value":0.2,"samples":[["i-123"]]}}"""
      ),
      output.result()
    )
  }

  test("analytics, basic aggregate") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries),
        Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries)
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    client.process(sampleLwcEvent)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
  }

  test("analytics, basic aggregate extract value") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,www,:eq,value,duration,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    client.process(sampleLwcEvent)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "www", "value" -> "duration"))
      assertEqualsDouble(event.value, 8.4, 1e-12)
    }
  }

  test("analytics, basic aggregate with post filter") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,www,:eq,value,duration,:eq,:and,duration,43ns,:gt,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val filter = TypedLwcEventFilter(Map("duration" -> TypedLwcEventFilter.DurationMatcher))
    val client = LwcEventClient(subs, output.addOne, clock, filter)
    (0 until 100).foreach { i =>
      val event = mkEvent(i)
      client.process(event)
    }
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "www", "value" -> "duration"))
      assertEqualsDouble(event.value, 800.8, 1e-12)
    }
  }

  test("analytics, basic aggregate extract value using custom key") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,www,:eq,custom,duration,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val filter = TypedLwcEventFilter(Map.empty, "custom")
    val client = LwcEventClient(subs, output.addOne, clock, filter)
    client.process(sampleLwcEvent)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "www", "custom" -> "duration"))
      assertEqualsDouble(event.value, 8.4, 1e-12)
    }
  }

  test("analytics, group by") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries),
        Subscription("2", step, "app,www,:eq,:sum,(,node,),:by", Subscriptions.TimeSeries)
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    client.process(sampleLwcEvent)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "www", "node" -> "i-123"))
      assertEqualsDouble(event.value, 0.2, 1e-12)
    }
  }

  test("analytics, group by missing key") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", 60000, "app,www,:eq,:sum,(,foo,),:by", Subscriptions.TimeSeries)
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne)
    client.process(sampleLwcEvent)
    assert(output.result().isEmpty)
  }

  test("analytics, sync") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", step, "app,foo,:eq,:sum", Subscriptions.TimeSeries),
        Subscription("2", step, "app,www,:eq,:sum", Subscriptions.TimeSeries)
      )
    )
    val output = List.newBuilder[String]
    val client = TestLwcEventClient(subs, output.addOne, clock)
    client.process(sampleLwcEvent)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    assertEquals(output.result().size, 1)

    // Sync expressions, same set
    (2 until 10).foreach { i =>
      output.clear()
      client.sync(subs)
      client.process(sampleLwcEvent)
      clock.setWallTime(step * i)
      client.process(LwcEvent.HeartbeatLwcEvent(step * i))
      assertEquals(output.result().size, 1)
    }

    // Sync expressions, subset
    (10 until 20).foreach { i =>
      output.clear()
      client.sync(subs.copy(timeSeries = subs.timeSeries.tail))
      client.process(sampleLwcEvent)
      clock.setWallTime(step * i)
      client.process(LwcEvent.HeartbeatLwcEvent(step * i))
      assertEquals(output.result().size, 1)
    }
  }

  test("trace analytics, basic aggregate") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription("1", step, "app,e,:eq,parent.app,b,:eq,:and", Subscriptions.TimeSeries)
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "e", "parent.app" -> "b"))
      assertEqualsDouble(event.value, 0.6, 1e-12)
    }
  }

  test("trace analytics, basic aggregate extract value") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,e,:eq,value,duration,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "e", "value" -> "duration"))
      assertEqualsDouble(event.value, 2.1e-8, 1e-12)
    }
  }

  test("trace analytics, using trace filter") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "duration,50ns,:gt,value,duration,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val filter = TraceLwcEvent.TraceLwcEventFilter
    val client = LwcEventClient(subs, output.addOne, clock, filter)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("value" -> "duration"))
      assertEqualsDouble(event.value, 4.88e-8, 1e-12)
    }
  }

  test("trace analytics, using trace filter, any predicate, one match") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,a,:eq,any.status,ERROR,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val filter = TraceLwcEvent.TraceLwcEventFilter
    val client = LwcEventClient(subs, output.addOne, clock, filter)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "a", "any.status" -> "ERROR"))
      assertEqualsDouble(event.value, 0.2, 1e-12)
    }
  }

  test("trace analytics, using trace filter, any predicate, many matches") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,a,:eq,any.status,OK,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val filter = TraceLwcEvent.TraceLwcEventFilter
    val client = LwcEventClient(subs, output.addOne, clock, filter)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 1)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      assertEquals(event.tags, Map("app" -> "a", "any.status" -> "OK"))
      assertEqualsDouble(event.value, 0.2, 1e-12)
    }
  }

  test("trace analytics, using trace filter, any predicate, no matches") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,a,:eq,any.status,FOO,:eq,:and,:sum",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val filter = TraceLwcEvent.TraceLwcEventFilter
    val client = LwcEventClient(subs, output.addOne, clock, filter)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assert(vs.isEmpty)
  }

  test("trace analytics, group by with parent attributes") {
    val subs = Subscriptions.fromTypedList(
      List(
        Subscription(
          "1",
          step,
          "app,e,:eq,(,app,parent.app,parent.parent.app,),:by",
          Subscriptions.TimeSeries
        )
      )
    )
    val output = List.newBuilder[String]
    val client = LwcEventClient(subs, output.addOne, clock)
    TraceLwcEvent.sampleTrace.foreach(client.process)
    clock.setWallTime(step)
    client.process(LwcEvent.HeartbeatLwcEvent(step))
    val vs = output.result()
    assertEquals(vs.size, 2)
    vs.foreach { v =>
      val event = parseDatapointEvent(v)
      if (event.tags == Map("app" -> "e", "parent.app" -> "b", "parent.parent.app" -> "a"))
        assertEqualsDouble(event.value, 0.6, 1e-12)
      else if (event.tags == Map("app" -> "e", "parent.app" -> "c", "parent.parent.app" -> "a"))
        assertEqualsDouble(event.value, 0.4, 1e-12)
      else
        fail(s"unexpected tags: ${event.tags}")
    }
  }

  test("check if event matches query") {
    val matching = Query.And(Query.Equal("app", "www"), Query.Equal("node", "i-123"))
    val nonMatching = Query.And(Query.Equal("app", "www"), Query.Equal("node", "i-124"))
    assert(matching.matches(sampleLwcEvent.tagValue _))
    assert(!nonMatching.matches(sampleLwcEvent.tagValue _))
  }
}

object LwcEventClientSuite {

  case class TestLwcEventClient(
    subscriptions: Subscriptions,
    consumer: String => Unit,
    clock: Clock
  ) extends AbstractLwcEventClient(clock, LwcEventFilter.default) {

    sync(subscriptions)

    override def sync(subscriptions: Subscriptions): Unit = {
      super.sync(subscriptions)
    }

    override def submit(id: String, event: LwcEvent): Unit = {
      Using.resource(new StringWriter()) { w =>
        Using.resource(Json.newJsonGenerator(w)) { gen =>
          gen.writeStartObject()
          gen.writeStringField("id", id)
          gen.writeFieldName("event")
          event.encode(gen)
          gen.writeEndObject()
        }
        consumer(s"data: ${w.toString}")
      }
    }
  }

  case class Message(id: String, event: DatapointEvent)

  private def parseDatapointEvent(str: String): DatapointEvent = {
    Json.decode[Message](str.substring("data: ".length)).event
  }
}
